/*-------------------------------------------------------------------------
 *
 * resqueue.c
 *	  POSTGRES internals code for resource queues and locks.
 *
 *
 * Copyright (c) 2006-2008, Greenplum inc.
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <time.h>

#include "pgstat.h"
#include "access/heapam.h"
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "cdb/cdbgang.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/lock.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/resscheduler.h"
#include "cdb/memquota.h"

static void ResCleanUpLock(LOCK *lock, PROCLOCK *proclock, uint32 hashcode, bool wakeupNeeded);

static ResPortalIncrement *ResIncrementAdd(ResPortalIncrement *incSet, PROCLOCK *proclock, ResourceOwner owner);
static bool ResIncrementRemove(ResPortalTag *portaltag);

static void ResWaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, ResPortalIncrement *incrementSet);

static void ResLockUpdateLimit(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet, bool increment, bool inError);

static void				ResGrantLock(LOCK *lock, PROCLOCK *proclock);
static bool				ResUnGrantLock(LOCK *lock, PROCLOCK *proclock);


/*
 * Global Variables
 */
static HTAB *ResPortalIncrementHash;		/* Hash of resource increments. */
static HTAB *ResQueueHash;					/* Hash of resource queues. */


/*
 * Record structure holding the to be exposed per queue data, used by
 * pg_resqueue_status().
 */
typedef struct
{
	Oid		queueid;
	float4	queuecountthreshold;
	float4	queuecostthreshold;
	float4  queuememthreshold;
	float4	queuecountvalue;
	float4	queuecostvalue;
	float4  queuememvalue;
	int		queuewaiters;
	int		queueholders;
}	QueueStatusRec;


/*
 * Function context for data persisting over repeated calls, used by 
 * pg_resqueue_status().
 */
typedef struct
{
	QueueStatusRec	*record;
	int				numRecords;
	
}	QueueStatusContext;

static void BuildQueueStatusContext(QueueStatusContext *fctx);


/*
 * ResLockAcquire -- acquire a resource lock.
 *
 * Notes and critisms:
 *
 *	Returns LOCKACQUIRE_OK if we get the lock,
 *	        LOCKACQUIRE_NOT_AVAIL if we don't want to take the lock after all.
 *
 *	Analogous to LockAcquire, but the lockmode and session boolean are not
 *	required in the function prototype as we are *always* lockmode ExclusiveLock
 *	and have no session locks. 
 *
 *	The semantics of resource locks mean that lockmode has minimal meaning -
 *	the conflict rules are determined by the state of the counters of the
 *	corresponding queue. We are maintaining the lock lockmode and related
 *	elements (holdmask etc), in order to ease comparison with standard locks
 *	at deadlock check time (well, so we hope anyway.)
 *
 * The "locktag" here consists of the queue-id and the "lockmethod" of
 * "resource-queue" and an identifier specifying that this is a
 * resource-locktag.
 *
 */
LockAcquireResult
ResLockAcquire(LOCKTAG *locktag, ResPortalIncrement *incrementSet)
{
	LOCKMODE		lockmode = ExclusiveLock;
	LOCK			*lock;
	PROCLOCK		*proclock;
	PROCLOCKTAG		proclocktag;
	LOCALLOCKTAG	localtag;
	LOCALLOCK		*locallock;
	uint32			hashcode;
	uint32			proclock_hashcode;
	int				partition;
	LWLockId		partitionLock;
	bool			found;
	ResourceOwner	owner;
	ResQueue		queue;
	int				status;

	/* Setup the lock method bits. */
	Assert(locktag->locktag_lockmethodid == RESOURCE_LOCKMETHOD);

	/* Provide a resource owner. */
	owner = CurrentResourceOwner;

	/*
	 * Find or create a LOCALLOCK entry for this lock and lockmode
	 */
	MemSet(&localtag, 0, sizeof(localtag));     /* must clear padding */
	localtag.lock = *locktag;
	localtag.mode = lockmode;

	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
										  (void *) &localtag,
										  HASH_ENTER, &found);

	/*
	 * if it's a new locallock object, initialize it, if it already exists
	 * then that is enough for the resource locks.
	 */
	if (!found)
	{
		locallock->lock = NULL;
		locallock->proclock = NULL;
		locallock->hashcode = LockTagHashCode(&(localtag.lock));
		locallock->nLocks = 0;
		locallock->numLockOwners = 0;
		locallock->maxLockOwners = 8;
		locallock->lockOwners = NULL;
		locallock->lockOwners = (LOCALLOCKOWNER *)
			MemoryContextAlloc(TopMemoryContext, locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
	}

	/* We are going to examine the shared lock table. */
	hashcode = locallock->hashcode;
	partition = LockHashPartition(hashcode);
	partitionLock = LockHashPartitionLock(hashcode);

	LWLockAcquire(partitionLock, LW_EXCLUSIVE);

	/*
	 * 	Find or create a lock with this tag.
	 */
	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
												(void *) locktag,
												hashcode,
												HASH_ENTER_NULL,
												&found);
	locallock->lock = lock;
	if (!lock)
	{
	 	LWLockRelease(partitionLock);
		ereport(ERROR,
			  	(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
				 errhint("You may need to increase max_resource_qeueues.")));
	}

	/*
	 * if it's a new lock object, initialize it.
	 */
	if (!found)
	{
		lock->grantMask = 0;
		lock->waitMask = 0;
		SHMQueueInit(&(lock->procLocks));
		ProcQueueInit(&(lock->waitProcs));
		lock->nRequested = 0;
		lock->nGranted = 0;
		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
	}
	else
	{
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
		Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
		Assert(lock->nGranted <= lock->nRequested);
	}
	
	/*
	 *  Create the hash key for the proclock table.
	 */
	MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));	/* Clear padding.*/
	proclocktag.myLock = lock;
	proclocktag.myProc = MyProc;

	proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);

	/*
	 * Find or create a proclock entry with this tag. 
	 */ 
	proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
														(void *) &proclocktag,
														proclock_hashcode,
														HASH_ENTER_NULL,
														&found);
	locallock->proclock = proclock;
    if (!proclock)
	{
		/* Not enough shmem for the proclock. */
		if (lock->nRequested == 0)
		{
			/*
			 * There are no other requestors of this lock, so garbage-collect
			 * the lock object.  We *must* do this to avoid a permanent leak
			 * of shared memory, because there won't be anything to cause
			 * anyone to release the lock object later.
			 */
			Assert(SHMQueueEmpty(&(lock->procLocks)));
			if (!hash_search_with_hash_value(LockMethodLockHash,
											 (void *) &(lock->tag),
											 hashcode,
											 HASH_REMOVE,
											 NULL))
				elog(PANIC, "lock table corrupted");
		}
		LWLockRelease(partitionLock);
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory"),
				 errhint("You may need to increase max_resource_qeueues.")));
	}

	/*
	 * If new, initialize the new entry.
	 */
	if (!found)
	{
		proclock->holdMask = 0;
		proclock->releaseMask = 0;
		/* Add proclock to appropriate lists */
		SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
		SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]), &proclock->procLink);
		proclock->nLocks = 0;
		SHMQueueInit(&(proclock->portalLinks));
	}
	else
	{
		Assert((proclock->holdMask & ~lock->grantMask) == 0);
		/* Could do a deadlock risk check here. */
	}

	/*
	 * lock->nRequested and lock->requested[] count the total number of
	 * requests, whether granted or waiting, so increment those immediately.
	 * The other counts don't increment till we get the lock.
	 */
	lock->nRequested++;
	lock->requested[lockmode]++;
	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));

	LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);

	/* Look up existing queue */
	PG_TRY();
	{
		queue = GetResQueueFromLock(lock);
	}
	PG_CATCH();
	{
		/* Something wrong happened - our RQ is gone. Release all locks and clean out */
		LWLockReleaseAll();
		PG_RE_THROW();
	}
	PG_END_TRY();

	/*
	 * If the query cost is smaller than the ignore cost limit for this queue
	 * then don't try to take a lock at all.
	 */
	if (incrementSet->increments[RES_COST_LIMIT] < queue->ignorecostlimit)
	{
		/* Decrement requested. */
		lock->nRequested--;
		lock->requested[lockmode]--;
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));

		/* Clean up this lock. */
        if (proclock->nLocks == 0)
		    RemoveLocalLock(locallock);

		ResCleanUpLock(lock, proclock, hashcode, false);

		LWLockRelease(ResQueueLock);
		LWLockRelease(partitionLock);

		/*
		 * To avoid queue accounting problems, we will need to reset the
		 * queueId and portalId for this portal *after* returning from
		 * here.
		 */
		return LOCKACQUIRE_NOT_AVAIL;
	}

	/*
	 * Otherwise, we are going to take a lock, Add an increment to the 
	 * increment hash for this process.
	 */
	incrementSet = ResIncrementAdd(incrementSet, proclock, owner);
	if (!incrementSet)
	{
		LWLockRelease(ResQueueLock);
		LWLockRelease(partitionLock);
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of shared memory adding portal increments"),
				 errhint("You may need to increase max_resource_portals_per_transaction.")));
	}

	/*
	 * Check if the lock can be acquired (i.e. if the resource the lock and 
	 * queue control is not exhausted). 
	 */
	status = ResLockCheckLimit(lock, proclock, incrementSet, true);
	if (status == STATUS_ERROR)
	{
		/*
		 * The requested lock has individual increments that are larger than
		 * some of the thresholds for the corrosponding queue, and overcommit
		 * is not enabled for them. So abort and clean up.
		 */
		ResPortalTag		portalTag;

		/* Adjust the counters as we no longer want this lock. */
		lock->nRequested--;
		lock->requested[lockmode]--;
		Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));

		/* Clean up this lock. */
        if (proclock->nLocks == 0)
    		RemoveLocalLock(locallock);

		ResCleanUpLock(lock, proclock, hashcode, false);

		/* Kill off the increment. */
		MemSet(&portalTag, 0, sizeof(ResPortalTag));
		portalTag.pid = incrementSet->pid;
		portalTag.portalId = incrementSet->portalId;

		ResIncrementRemove(&portalTag);

		LWLockRelease(ResQueueLock);
		LWLockRelease(partitionLock);
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
				 errmsg("statement requires more resources than resource queue allows")));
	}
	else if (status ==  STATUS_OK)
	{
		/* 
		 * The requested lock will *not* exhaust the limit for this resource
		 * queue, so record this in the local lock hash, and grant it.
		 */
		ResGrantLock(lock, proclock);
		ResLockUpdateLimit(lock, proclock, incrementSet, true, false);

		LWLockRelease(ResQueueLock);

		/* Note the start time for queue statistics. */
		pgstat_record_start_queue_exec(incrementSet->portalId,
									   locktag->locktag_field1);
	}
	else
	{
		Assert(status == STATUS_FOUND);

		/*
		 * The requested lock will exhaust the limit for this resource queue,
		 * so must wait.
		 */

		/* Set bitmask of locks this process already holds on this object. */
		MyProc->heldLocks = proclock->holdMask; /* Do we need to do this?*/
		
		/* 
		 * Set the portal id so we can identify what increments we are wanting
		 * to apply at wakeup.
		 */
		MyProc->waitPortalId = incrementSet->portalId;

		LWLockRelease(ResQueueLock);

		/* Note count and wait time for queue statistics. */
		pgstat_count_queue_wait(incrementSet->portalId,
								locktag->locktag_field1);
		pgstat_record_start_queue_wait(incrementSet->portalId,
									   locktag->locktag_field1);

		/*
		 * Free/destroy idle gangs as we are going to sleep.
		 */
		if (ResourceCleanupIdleGangs)
		{
			cleanupIdleReaderGangs();
		}

		/*
		 * Sleep till someone wakes me up.
		 */
		ResWaitOnLock(locallock, owner, incrementSet);

		/*
		 * Have been awakened, check state is consistent.
		 */
		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
		{
			LWLockRelease(partitionLock);
			elog(ERROR, "ResLockAcquire failed");
		}

		/* Reset the portal id. */
		MyProc->waitPortalId = INVALID_PORTALID;

		/* End wait time and start execute time statistics for this queue. */
		pgstat_record_end_queue_wait(incrementSet->portalId,
									 locktag->locktag_field1);
		pgstat_record_start_queue_exec(incrementSet->portalId,
									   locktag->locktag_field1);
	}

	/* Release the  partition lock. */
	LWLockRelease(partitionLock);

	return LOCKACQUIRE_OK;
}

/*
 * ResLockRelease -- release a resource lock.
 *
 * The "locktag" here consists of the queue-id and the "lockmethod" of
 * "resource-queue" and an identifier specifying that this is a
 * resource-locktag.
 */
bool
ResLockRelease(LOCKTAG *locktag, uint32 resPortalId)
{
	LOCKMODE		lockmode = ExclusiveLock;
	LOCK			*lock;
	PROCLOCK		*proclock;
	LOCALLOCKTAG	localtag;
	LOCALLOCK		*locallock;
	uint32			hashcode;
	LWLockId		partitionLock;
	ResourceOwner	owner;

	ResPortalIncrement	*incrementSet;
	ResPortalTag		portalTag;

	/* Check the lock method bits. */
	Assert(locktag->locktag_lockmethodid == RESOURCE_LOCKMETHOD);

	/* Provide a resource owner. */
	owner = CurrentResourceOwner;

	/*
	 * Find the LOCALLOCK entry for this lock and lockmode
	 */
	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
	localtag.lock = *locktag;
	localtag.mode = lockmode;

	locallock = (LOCALLOCK *)
		hash_search(LockMethodLocalHash, (void *) &localtag, HASH_FIND, NULL);

	/*
     * If the lock request did not get very far, cleanup is easy.
	 */
	if (!locallock ||
        !locallock->lock ||
        !locallock->proclock)
	{
        elog(LOG, "Resource queue %d: no lock to release", locktag->locktag_field1);
        if (locallock)
		{
            RemoveLocalLock(locallock);
		}

		return false;
	}

	hashcode = locallock->hashcode;

	/* We are going to examine the shared lock table. */
	partitionLock = LockHashPartitionLock(hashcode);

	LWLockAcquire(partitionLock, LW_EXCLUSIVE);

    /*
     * Verify that our LOCALLOCK still matches the shared tables.
     *
     * While waiting for the lock, our request could have been canceled 
     * to resolve a deadlock.  It could already have been removed from
     * the shared LOCK and PROCLOCK tables, and those entries could 
     * have been reallocated for some other request.  Then all we need
     * to do is clean up the LOCALLOCK entry.
     */
	lock = locallock->lock;
	proclock = locallock->proclock;
    if (proclock->tag.myLock != lock ||
        proclock->tag.myProc != MyProc ||
        memcmp(&locallock->tag.lock, &lock->tag, sizeof(lock->tag)) != 0)
    {
		LWLockRelease(partitionLock);
        elog(DEBUG1, "Resource queue %d: lock already gone", locktag->locktag_field1);
        RemoveLocalLock(locallock);

		return false;
    }

	/*
	 * Double-check that we are actually holding a lock of the type we want to
	 * Release.
	 */
	if (!(proclock->holdMask & LOCKBIT_ON(lockmode)) || proclock->nLocks <= 0)
	{
		LWLockRelease(partitionLock);
        elog(DEBUG1, "Resource queue %d: proclock not held", locktag->locktag_field1);
        RemoveLocalLock(locallock);

		return false;			
	}

	/*
	 * Find the increment for this portal and process.
	 */
	MemSet(&portalTag, 0, sizeof(ResPortalTag));
	portalTag.pid = MyProc->pid;
	portalTag.portalId = resPortalId;
	
	LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);

	incrementSet = ResIncrementFind(&portalTag);
	if (!incrementSet)
	{
        elog(DEBUG1, "Resource queue %d: increment not found on unlock", locktag->locktag_field1);
        if (proclock->nLocks == 0)
		{
            RemoveLocalLock(locallock);
		}

		ResCleanUpLock(lock, proclock, hashcode, true);
		LWLockRelease(ResQueueLock);
		LWLockRelease(partitionLock);
		return false;			
	}

	/*
	 * Un-grant the lock.
	 */
	ResUnGrantLock(lock, proclock);
	ResLockUpdateLimit(lock, proclock, incrementSet, false, false);

	/*
	 * Perform clean-up, waking up any waiters!
	 */
    if (proclock->nLocks == 0)
		RemoveLocalLock(locallock);

	ResCleanUpLock(lock, proclock, hashcode, true);

	/* 
	 * Clean up the increment set. 
	 */
	if (!ResIncrementRemove(&portalTag))
	{
		LWLockRelease(ResQueueLock);
		LWLockRelease(partitionLock);

		elog(ERROR, "no increment to remove for portal id %u and pid %d", resPortalId, MyProc->pid);
		/* not reached */
	}

	LWLockRelease(ResQueueLock);
	LWLockRelease(partitionLock);

	/* Update execute statistics for this queue, count and elapsed time. */
	pgstat_count_queue_exec(resPortalId, locktag->locktag_field1);
	pgstat_record_end_queue_exec(resPortalId, locktag->locktag_field1);
		
	return true;
}


/*
 * ResLockCheckLimit -- test whether the given process acquiring the this lock 
 *	will cause a resource to exceed its limits.
 * 
 * Notes:
 *	Returns STATUS_FOUND if limit will be exhausted, STATUS_OK if not.
 *
 *	If increment is true, then the resource counter associated with the lock 
 *	is to be incremented, if false then decremented.
 *
 *	Named similarly to the LockCheckconflicts() for standard locks, but it is
 *	not checking a table of lock mode conflicts, but whether a shared counter
 *	for some resource is exhausted.
 *
 *	The resource queue lightweight lock (ResQueueLock) must be held while
 *	this function is called.
 *
 * MPP-4340: modified logic so that we return STATUS_OK when
 * decrementing resource -- decrements shouldn't care, let's not stop
 * them from freeing resources!
 */
int
ResLockCheckLimit(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet, bool increment)
{
	ResQueue		queue;
	ResLimit		limits;
	bool			over_limit = false;
	bool			will_overcommit = false;
	int				status = STATUS_OK;
	Cost			increment_amt;
	int				i;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));

	/* Get the queue for this lock.*/
	queue = GetResQueueFromLock(lock);
	limits = queue->limits;
	
	for (i = 0; i < NUM_RES_LIMIT_TYPES; i++)
	{
		/*
		 * Skip the default threshold, as it means 'no limit'.
		 */
		if (limits[i].threshold_value == INVALID_RES_LIMIT_THRESHOLD)
			continue;
			
		switch (limits[i].type)
		{
			case RES_COUNT_LIMIT:
			{
				Assert((limits[i].threshold_is_max));

				/* Setup whether to increment or decrement the # active. */
				if (increment)
				{
					increment_amt = incrementSet->increments[i];

					if (limits[i].current_value + increment_amt > limits[i].threshold_value)
						over_limit = true;
				}
				else
				{
					increment_amt = -1 * incrementSet->increments[i];
				}

#ifdef RESLOCK_DEBUG
				elog(DEBUG1, "checking count limit threshold %.0f current %.0f",
					 limits[i].threshold_value, limits[i].current_value);
#endif
			}
			break;

			case RES_COST_LIMIT:
			{
				Assert((limits[i].threshold_is_max));

				/* Setup whether to increment or decrement the cost. */
				if (increment)
				{
					increment_amt = incrementSet->increments[i];

					/* Check if this will overcommit */
					if (increment_amt > limits[i].threshold_value)
						will_overcommit = true;

					if (queue->overcommit)
					{
						/*
						 * Autocommit is enabled, allow statements that
						 * blowout the limit if noone else is active!
						 */
						if ((limits[i].current_value + increment_amt > limits[i].threshold_value) &&
							(limits[i].current_value > 0.1))
							over_limit = true;
					} 
					else
					{
						/*
						 * No autocommit, so always fail statements that
						 * blowout the limit.
						 */
						if (limits[i].current_value + increment_amt > limits[i].threshold_value)
							over_limit = true;
					}
				}
				else
				{
					increment_amt = -1 * incrementSet->increments[i];
				}

#ifdef RESLOCK_DEBUG
				elog(DEBUG1, "checking cost limit threshold %.2f current %.2f",
					 limits[i].threshold_value, limits[i].current_value);
#endif
			}
			break;

			case RES_MEMORY_LIMIT:
			{
				Assert((limits[i].threshold_is_max));

				/* Setup whether to increment or decrement the # active. */
				if (increment)
				{
					increment_amt = incrementSet->increments[i];

					if (limits[i].current_value + increment_amt > limits[i].threshold_value)
						over_limit = true;
				}
				else
				{
					increment_amt = -1 * incrementSet->increments[i];
				}

#ifdef RESLOCK_DEBUG
				elog(DEBUG1, "checking memory limit threshold %.0f current %.0f",
					 limits[i].threshold_value, limits[i].current_value);
#endif
			}
			break;
			
			default:
				break;		
		}
	}

	if (will_overcommit && !queue->overcommit)
		status = STATUS_ERROR;
	else if (over_limit)
		status = STATUS_FOUND;

	return status;

}


/*
 * ResLockUpdateLimit -- update the resource counter for this lock with the
 *	increment for the process.
 *
 * Notes:
 *	If increment is true, then the resource counter associated with the lock 
 *	is to be incremented, if false then decremented.
 *
 * Warnings:
 *	The resource queue lightweight lock (ResQueueLock) must be held while
 *	this function is called.
 */
void
ResLockUpdateLimit(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet, bool increment, bool inError)
{
	ResQueue 		queue;
	ResLimit		limits;
	Cost			increment_amt;
	int				i;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));

	/* Get the queue for this lock.*/
	queue = GetResQueueFromLock(lock);
	limits = queue->limits;

	for (i = 0; i < NUM_RES_LIMIT_TYPES; i++)
	{

		/*
		 * MPP-8454: NOTE that if our resource-queue has been modified
		 * since we locked our resources, on unlock it is possible
		 * that we're deducting an increment that we never added --
		 * the lowest value we should allow is 0.0.
		 *
		 */
		switch (limits[i].type)
		{
			case RES_COUNT_LIMIT:
			case RES_COST_LIMIT:
			case RES_MEMORY_LIMIT:
			{
				Cost new_value;

				Assert((limits[i].threshold_is_max));
				/* setup whether to increment or decrement the # active. */
				if (increment)
				{
					increment_amt = incrementSet->increments[i];
				}
				else
				{
					increment_amt = -1 * incrementSet->increments[i];
				}

				new_value = ceil(limits[i].current_value + increment_amt);
				new_value = Max(new_value, 0.0);

				limits[i].current_value = new_value;
			}
			break;

			default:
				break;
		}
	}

	return;
}

/*
 * GetResQueueFromLock -- find the resource queue for a given lock;
 *
 * Notes:
 *	should be handed a locktag containing a valid queue id.
 * 	should hold the resource queue lightweight lock during this operation
 */
ResQueue
GetResQueueFromLock(LOCK *lock)
{	
	Assert(LWLockHeldExclusiveByMe(ResQueueLock));

	ResQueue	queue = ResQueueHashFind(GET_RESOURCE_QUEUEID_FOR_LOCK(lock));

	if (queue == NULL)
	{
		elog(ERROR, "cannot find queue id %d", GET_RESOURCE_QUEUEID_FOR_LOCK(lock));
	}

	return queue;
}

/*
 * ResGrantLock -- grant a resource lock. 
 *
 * Warnings:
 *	It is expected that the partition lock is held before calling this 
 *  function, as the various shared queue counts are inspected.
 */
static void
ResGrantLock(LOCK *lock, PROCLOCK *proclock)
{	
	LOCKMODE	lockmode = ExclusiveLock;

	/* Update the standard lock stuff, for locks and proclocks. */
	lock->nGranted++;  
	lock->granted[lockmode]++;
	lock->grantMask |= LOCKBIT_ON(lockmode);
	if (lock->granted[lockmode] == lock->requested[lockmode])
	{
		lock->waitMask &= LOCKBIT_OFF(lockmode);	/* no more waiters.*/

	}
	proclock->holdMask |= LOCKBIT_ON(lockmode);

	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);

	/* Update the holders count. */
	proclock->nLocks++;

	return;
}

/*
 * ResUnGrantLock --  opposite of ResGrantLock.
 *
 * Notes:
 *	The equivalant standard lock function returns true only if there are waiters,
 *	we don't do this.
 *
 * Warnings:
 *	It is expected that the partition lock held before calling this 
 *  function, as the various shared queue counts are inspected.
 */
bool
ResUnGrantLock(LOCK *lock, PROCLOCK *proclock)
{
	LOCKMODE	lockmode = ExclusiveLock;

	Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
	Assert(lock->nGranted <= lock->nRequested);

	/* Update the standard lock stuff. */
	lock->nRequested--;
	lock->requested[lockmode]--;
	lock->nGranted--;
	lock->granted[lockmode]--;

	if (lock->granted[lockmode] == 0)
	{
		/* change the conflict mask.  No more of this lock type. */
		lock->grantMask &= LOCKBIT_OFF(lockmode);
	}

	/* Update the holders count. */
	proclock->nLocks--;

	/* Fix the per-proclock state. */
	if (proclock->nLocks == 0)
	{
		proclock->holdMask &= LOCKBIT_OFF(lockmode);
	}

	return true;
}


/*
 * ResCleanUpLock -- lock cleanup, remove entry from lock queues and start
 *	waking up waiters.
 *
 * MPP-6055/MPP-6144: we get called more than once; if we've already cleaned
 * up, don't walk off the end of lists; or panic when we can't find our hashtable
 * entries.
 */
static void
ResCleanUpLock(LOCK *lock, PROCLOCK *proclock, uint32 hashcode, bool wakeupNeeded)
{
	Assert(LWLockHeldExclusiveByMe(ResQueueLock));

	/*
	 * If this was my last hold on this lock, delete my entry in the proclock
	 * table.
	 */
	if (proclock->holdMask == 0 && proclock->nLocks == 0)
	{
		uint32		proclock_hashcode;

		if (proclock->lockLink.next != INVALID_OFFSET)
			SHMQueueDelete(&proclock->lockLink);

		if (proclock->procLink.next != INVALID_OFFSET)
			SHMQueueDelete(&proclock->procLink);

		proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
		hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag),
									proclock_hashcode, HASH_REMOVE, NULL);
	}

	if (lock->nRequested == 0)
	{
		/*
		 * The caller just released the last lock, so garbage-collect the lock
		 * object.
		 */
		Assert(SHMQueueEmpty(&(lock->procLocks)));

		hash_search(LockMethodLockHash, (void *) &(lock->tag), HASH_REMOVE, NULL);
	}

	/*
	 * If appropriate, awaken any waiters.
	 */
	if (wakeupNeeded)
	{
		ResProcLockRemoveSelfAndWakeup(lock);
	}
	
	return;
}


/*
 * WaitOnResLock -- wait to acquire a resource lock.
 *
 * 
 * Warnings:
 *	It is expected that the partition lock is held before calling this 
 *  function, as the various shared queue counts are inspected.
 */
static void
ResWaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, ResPortalIncrement *incrementSet)
{

	uint32      hashcode = locallock->hashcode;
	LWLockId    partitionLock = LockHashPartitionLock(hashcode);
	const char	*old_status;
	char		*new_status = NULL;
	int			len;

	/* Report change to waiting status */
	if (update_process_title)
	{
		old_status = get_ps_display(&len);
		new_status = (char *) palloc(len + 8 + 1);
		memcpy(new_status, old_status, len);
		strcpy(new_status + len, " queuing");
		set_ps_display(new_status, false);	/* truncate off " queuing" */
		new_status[len] = '\0';
	}
	pgstat_report_waiting(PGBE_WAITING_LOCK);

	awaitedLock = locallock;
	awaitedOwner = owner;

	/*
	 * Now sleep.
	 *
	 * NOTE: self-deadlocks will throw (do a non-local return).
	 */
	if (ResProcSleep(ExclusiveLock, locallock, incrementSet) != STATUS_OK)
	{
		/*
		 * We failed as a result of a deadlock, see CheckDeadLock(). Quit now.
		 */
		LWLockRelease(partitionLock);
		DeadLockReport();
	}

	awaitedLock = NULL;

	/* Report change to non-waiting status */
	if (update_process_title)
	{
		set_ps_display(new_status, false);
		pfree(new_status);
	}
	pgstat_report_waiting(PGBE_WAITING_NONE);

	return;
}


/*
 * ResProcLockRemoveSelfAndWakeup -- awaken any processses waiting on a resource lock.
 *
 * Notes:
 *  It always remove itself from the waitlist.
 *	Need to only awaken enough as many waiters as the resource controlled by 
 *	the the lock should allow!
 */
void
ResProcLockRemoveSelfAndWakeup(LOCK *lock)
{
	PROC_QUEUE	*waitQueue = &(lock->waitProcs);
	int			queue_size = waitQueue->size;
	PGPROC		*proc;
	uint32		hashcode;
	LWLockId	partitionLock;

	int			status;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));

	/*
	 * XXX: This code is ugly and hard to read -- it should be a lot
	 * simpler, especially when there are some odd cases (process
	 * sitting on its own wait-queue).
	 */

	Assert(queue_size >= 0);
	if (queue_size == 0)
	{
		return;
	}

	proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);

	while (queue_size-- > 0)
	{
		/*
		 * Get the portal we are waiting on, and then its set of increments.
		 */
		ResPortalTag			portalTag;
		ResPortalIncrement		*incrementSet;

		/* Our own process may be on our wait-queue! */
		if (proc->pid == MyProc->pid)
		{
			PGPROC *nextproc;

			nextproc = (PGPROC *)MAKE_PTR(proc->links.next);

			SHMQueueDelete(&(proc->links));
			(proc->waitLock->waitProcs.size)--;

			proc = nextproc;

			continue;
		}

		MemSet(&portalTag, 0, sizeof(ResPortalTag));
		portalTag.pid = proc->pid;
		portalTag.portalId = proc->waitPortalId;

		incrementSet = ResIncrementFind(&portalTag);
		if (!incrementSet)
		{
			hashcode = LockTagHashCode(&(lock->tag));
			partitionLock = LockHashPartitionLock(hashcode);

			LWLockRelease(partitionLock);
			elog(ERROR, "no increment data for  portal id %u and pid %d", proc->waitPortalId, proc->pid);
		}

		/*
		 * See if it is ok to wake this guy. (note that the wakeup
		 * writes to the wait list, and gives back a *new* next proc).
		 */
		status = ResLockCheckLimit(lock, (PROCLOCK *) proc->waitProcLock, incrementSet, true);
		if (status == STATUS_OK)
		{
			ResGrantLock(lock, (PROCLOCK *) proc->waitProcLock);
			ResLockUpdateLimit(lock, (PROCLOCK *) proc->waitProcLock, incrementSet, true, false);

			proc = ResProcWakeup(proc, STATUS_OK);
		}
		else
		{
			/* Otherwise move on to the next guy. */
			proc = (PGPROC *) MAKE_PTR(proc->links.next);
		}
	}

	Assert(waitQueue->size >= 0);
	
	return;
}


/*
 * ResProcWakeup -- wake a sleeping process.
 *
 * (could we just use ProcWakeup here?)
 */
PGPROC *
ResProcWakeup(PGPROC *proc, int waitStatus)
{
	PGPROC		*retProc;

	/* Proc should be sleeping ... */
	if (proc->links.prev == INVALID_OFFSET ||
		proc->links.next == INVALID_OFFSET)
		return NULL;

	/* Save next process before we zap the list link */
	retProc = (PGPROC *) MAKE_PTR(proc->links.next);

	/* Remove process from wait queue */
	SHMQueueDelete(&(proc->links));
	(proc->waitLock->waitProcs.size)--;

	/* Clean up process' state and pass it the ok/fail signal */
	proc->waitLock = NULL;
	proc->waitProcLock = NULL;
	proc->waitStatus = waitStatus;

	/* And awaken it */
	 PGSemaphoreUnlock(&proc->sem);

	return retProc;
}


/*
 * ResRemoveFromWaitQueue -- Remove a process from the wait queue, cleaning up
 *	any locks.
 */
void
ResRemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
{
    LOCK			*waitLock = proc->waitLock;
    PROCLOCK		*proclock = proc->waitProcLock;
    LOCKMODE		lockmode = proc->waitLockMode;
#ifdef USE_ASSERT_CHECKING
    LOCKMETHODID	lockmethodid = LOCK_LOCKMETHOD(*waitLock);
#endif /* USE_ASSERT_CHECKING */
	ResPortalTag	portalTag;

	/* Make sure lockmethod is for a resource lock. */
	Assert(lockmethodid == RESOURCE_LOCKMETHOD);

    /* Make sure proc is waiting */
	Assert(proc->links.next != INVALID_OFFSET);
	Assert(waitLock);
	Assert(waitLock->waitProcs.size > 0);

	/* Remove proc from lock's wait queue */
	SHMQueueDelete(&(proc->links));
	waitLock->waitProcs.size--;

	/* Undo increments of request counts by waiting process */
	Assert(waitLock->nRequested > 0);
	Assert(waitLock->nRequested > proc->waitLock->nGranted);

	waitLock->nRequested--;
	Assert(waitLock->requested[lockmode] > 0);
	waitLock->requested[lockmode]--;

	/* don't forget to clear waitMask bit if appropriate */
	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
		waitLock->waitMask &= LOCKBIT_OFF(lockmode);

	/* Clean up the proc's own state */
	proc->waitLock = NULL;
	proc->waitProcLock = NULL;

	/*
	 * Remove the waited on portal increment.
	 */
	MemSet(&portalTag, 0, sizeof(ResPortalTag));
	portalTag.pid = MyProc->pid;
	portalTag.portalId = MyProc->waitPortalId;

	LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
	ResIncrementRemove(&portalTag);

	/*
	 * Delete the proclock immediately if it represents no already-held 
	 * locks.
	 * (This must happen now because if the owner of the lock decides to
	 * release it, and the requested/granted counts then go to zero,
	 * LockRelease expects there to be no remaining proclocks.) Then see if
     * any other waiters for the lock can be woken up now.
   	 */
   	ResCleanUpLock(waitLock, (PROCLOCK *) proclock, hashcode, true);
	LWLockRelease(ResQueueLock);

}


/*
 * ResCheckSelfDeadLock -- Check to see if I am going to deadlock myself. 
 *
 * What happens here is we scan our own set of portals and total up the
 * increments. If this exceeds any of the thresholds for the queue then
 * we need to signal that a self deadlock is about to occurr - modulo some
 * footwork for overcommit-able queues.
 */
bool
ResCheckSelfDeadLock(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet)
{
	ResQueue		queue;
	ResLimit		limits;
	int				i;
	Cost			incrementTotals[NUM_RES_LIMIT_TYPES];
	int				numPortals = 0;
	bool			countThesholdOvercommitted = false;
	bool			costThesholdOvercommitted = false;
	bool			memoryThesholdOvercommitted = false;
	bool			result = false;

	/* Get the resource queue lock before checking the increments.*/
	LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);

	/* Get the queue for this lock.*/
	queue = GetResQueueFromLock(lock);
	limits = queue->limits;

	/* Get the increment totals and number of portals for this queue. */
	TotalResPortalIncrements(MyProc->pid, queue->queueid, 
							 incrementTotals, &numPortals);
	
	/* 
	 * Now check them against the thresholds using the same logic as
	 * ResLockCheckLimit.
	 */
	for (i = 0; i < NUM_RES_LIMIT_TYPES; i++)
	{
		if (limits[i].threshold_value == INVALID_RES_LIMIT_THRESHOLD)
		{
			continue;
		}

		switch (limits[i].type)
		{
			case RES_COUNT_LIMIT:
			{
				if (incrementTotals[i] > limits[i].threshold_value)
				{
					countThesholdOvercommitted = true;
				}
			}
			break;

			case RES_COST_LIMIT:
			{
				if (incrementTotals[i] > limits[i].threshold_value)
				{
					costThesholdOvercommitted = true;
				}
			}
			
			case RES_MEMORY_LIMIT:
			{
				if (incrementTotals[i] > limits[i].threshold_value)
				{
					memoryThesholdOvercommitted = true;
				}
			}
			break;
		}
	}

	/* If any threshold is overcommitted then set the result.*/
	if (countThesholdOvercommitted || costThesholdOvercommitted || memoryThesholdOvercommitted)
	{
		result = true;
	}

	/*
	 * If the queue can be overcommited and we are overcommitting with
	 * 1 portal and *not* overcommitting the count threshold then
	 * don't trigger a self deadlock.
	 */
	if (queue->overcommit && numPortals == 1 && !countThesholdOvercommitted)
	{
		result = false;
	}

	if (result)
	{
		/*
		 * We're about to abort out of a partially completed lock
		 * acquisition.
		 *
		 * In order to allow our ref-counts to figure out how to
		 * clean things up we're going to "grant" the lock, which
		 * will immediately be cleaned up when our caller throws
		 * an ERROR.
		 */
		if (lock->nRequested > lock->nGranted)
		{
			/* we're no longer waiting. */
			pgstat_report_waiting(PGBE_WAITING_NONE);
			ResGrantLock(lock, proclock);
			ResLockUpdateLimit(lock, proclock, incrementSet, true, true);
		}
		/* our caller will throw an ERROR. */
	}

	LWLockRelease(ResQueueLock);

	return result;
}


/*
 * ResPortalIncrementHashTableInit - Initialize the increment hash.
 *
 * Notes:
 *	This stores the possible increments that a given statement will cause to
 *	be added to the limits for a resource queue.
 *	We allocate one extra slot for each backend, to free us from counting
 *	un-named portals.
 */
bool
ResPortalIncrementHashTableInit(void)
{
	HASHCTL		info;
	long		max_table_size = (MaxResourcePortalsPerXact + 1) * MaxBackends;
	int			hash_flags;

	/* Set key and entry sizes. */
	MemSet(&info, 0, sizeof(info));
	info.keysize = sizeof(ResPortalTag);
	info.entrysize = sizeof(ResPortalIncrement);
	info.hash = tag_hash;

	hash_flags = (HASH_ELEM | HASH_FUNCTION);

	ResPortalIncrementHash = ShmemInitHash("Portal Increment Hash",
										   max_table_size / 2,
										   max_table_size,
										   &info,
										   hash_flags);

	if (!ResPortalIncrementHash)
	{
		return false;
	}

	return true;
}


/*
 * ResIncrementAdd -- Add a new increment element to the increment hash.
 *
 * Notes:
 *	Return a pointer to where the new increment is stored, or NULL if we 
 *	are out of memory (or if we find a duplicate portalid).
 *
 *	The resource queue lightweight lock (ResQueueLock) *must* be held for
 *	this operation.
 */
static ResPortalIncrement *
ResIncrementAdd(ResPortalIncrement *incSet, PROCLOCK *proclock, ResourceOwner owner)
{
	ResPortalIncrement	*incrementSet;
	ResPortalTag		portaltag;
	int					i;
	bool				found;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));
	
	/*  Set up the key.*/
	MemSet(&portaltag, 0, sizeof(ResPortalTag));
	portaltag.pid = incSet->pid;
	portaltag.portalId = incSet->portalId;

	/* Add (or find) the value. */
	incrementSet = (ResPortalIncrement *)
		hash_search( ResPortalIncrementHash, (void *)&portaltag, HASH_ENTER_NULL, &found);

	if (!incrementSet)
	{
		return NULL;
	}

	/*  Initialize it. */
	if (!found)
	{
		incrementSet->pid = incSet->pid;
		incrementSet->portalId = incSet->portalId;
		incrementSet->owner = owner;
		incrementSet->isHold = incSet->isHold;
		incrementSet->isCommitted = false;
		for (i = 0 ; i < NUM_RES_LIMIT_TYPES ; i++)
		{
			incrementSet->increments[i] = incSet->increments[i];
		}
		SHMQueueInsertBefore(&proclock->portalLinks, &incrementSet->portalLink);
	}
	else
	{
		/* We have added this portId before - something has gone wrong! */

		elog(WARNING, "duplicate portal id %u for proc %d", incSet->portalId, incSet->pid);
		incrementSet = NULL;
	}

	return incrementSet;
}


/*
 * ResIncrementFind -- Find the increment for a portal and process.
 *
 * Notes
 *	Return a pointer to where the new increment is stored (NULL if not found).
 *
 *	The resource queue lightweight lock (ResQueueLock) *must* be held for
 *	this operation.
 */
ResPortalIncrement *
ResIncrementFind(ResPortalTag *portaltag)
{
	ResPortalIncrement	*incrementSet;
	bool				found;
	
	Assert(LWLockHeldExclusiveByMe(ResQueueLock));

	incrementSet = (ResPortalIncrement *)
		hash_search(ResPortalIncrementHash, (void *)portaltag, HASH_FIND, &found);

	if (!incrementSet)
	{
		return NULL;
	}

	return incrementSet;
}


/*
 * ResIncrementRemove -- Remove a  increment for a portal and process.
 *
 * Notes
 *	The resource queue lightweight lock (ResQueueLock) *must* be held for
 *	this operation.
 */
static bool
ResIncrementRemove(ResPortalTag *portaltag)
{
	ResPortalIncrement	*incrementSet;
	bool				found;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));
	
	incrementSet = (ResPortalIncrement *)
		hash_search(ResPortalIncrementHash, (void *)portaltag, HASH_REMOVE, &found);

	if (incrementSet == NULL)
	{
		return false;
	}

	SHMQueueDelete(&incrementSet->portalLink);

	return true;
}


/*
 * ResQueueHashTableInit -- initialize the hash table of resource queues.
 *
 * Notes:
 */
bool
ResQueueHashTableInit(void)
{
	HASHCTL			info;
	int				hash_flags;

	/* Set key and entry sizes. */
	MemSet(&info, 0, sizeof(info));
	info.keysize = sizeof(Oid);
	info.entrysize = sizeof(ResQueueData);
	info.hash = tag_hash;

	hash_flags = (HASH_ELEM | HASH_FUNCTION);

#ifdef RESLOCK_DEBUG
	elog(DEBUG1, "Creating hash table for %d queues", MaxResourceQueues);
#endif

	ResQueueHash = ShmemInitHash("Queue Hash",
								 MaxResourceQueues,
								 MaxResourceQueues,
								 &info,
								 hash_flags);

	if (!ResQueueHash)
		return false;

	return true;
}

/*
 * ResQueuehashNew -- return a new (empty) queue object to initialize.
 *
 * Notes
 *	The resource queue lightweight lock (ResQueueLock) *must* be held for
 *	this operation.
 */
ResQueue
ResQueueHashNew(Oid queueid)
{
	bool				found;
	ResQueueData		*queue;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));
	
	queue = (ResQueueData *)
		hash_search(ResQueueHash, (void *)&queueid, HASH_ENTER_NULL, &found);

	/* caller should test that the queue does not exist already */
	Assert(!found);

	if (!queue)
		return NULL;

	return (ResQueue) queue;
}

/*
 * ResQueueHashFind -- return the queue for a given oid.
 *
 * Notes
 *	The resource queue lightweight lock (ResQueueLock) *must* be held for
 *	this operation.
 */
ResQueue 
ResQueueHashFind(Oid queueid)
{
	bool				found;
	ResQueueData		*queue;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));
	
	queue = (ResQueueData *)
		hash_search(ResQueueHash, (void *)&queueid, HASH_FIND, &found);

	if (!queue)
		return NULL;

	return (ResQueue) queue;
}


/*
 * ResQueueHashRemove -- remove the queue for a given oid.
 *
 * Notes
 *	The resource queue lightweight lock (ResQueueLock) *must* be held for
 *	this operation.
 */
bool 
ResQueueHashRemove(Oid queueid)
{
	bool				found;
	void				*queue;

	Assert(LWLockHeldExclusiveByMe(ResQueueLock));
	
	queue = hash_search(ResQueueHash, (void *)&queueid, HASH_REMOVE, &found);
	if (!queue)
		return false;

	return true;
}

/* Number of columns produced by pg_resqueue_status() */
#define PG_RESQUEUE_STATUS_COLUMNS 5

/*
 * pg_resqueue_status - produce a view with one row per resource queue
 *	showing internal information (counter values, waiters, holders).
 */
Datum
pg_resqueue_status(PG_FUNCTION_ARGS)
{
	FuncCallContext			*funcctx = NULL;
	Datum					result;
	MemoryContext			oldcontext = NULL;
	QueueStatusContext		*fctx = NULL;			/* User function context. */
	HeapTuple				tuple = NULL;

	if (SRF_IS_FIRSTCALL())
	{

		funcctx = SRF_FIRSTCALL_INIT();

		/* Switch context when allocating stuff to be used in later calls */
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		fctx = (QueueStatusContext *) palloc(sizeof(QueueStatusContext));
		/*
		 * Allocate space for the per-call area - this overestimates, but
		 * means we can take the resource rescheduler lock after our
		 * memory context switching.
		 */
		fctx->record = (QueueStatusRec *)palloc(sizeof(QueueStatusRec) * MaxResourceQueues);

		funcctx->user_fctx = fctx;

		/* Construct a tuple descriptor for the result rows. */
		TupleDesc tupledesc = CreateTemplateTupleDesc(PG_RESQUEUE_STATUS_COLUMNS, false);
		TupleDescInitEntry(tupledesc, (AttrNumber) 1, "queueid", OIDOID, -1, 0);
		TupleDescInitEntry(tupledesc, (AttrNumber) 2, "queuecountvalue", FLOAT4OID, -1, 0);
		TupleDescInitEntry(tupledesc, (AttrNumber) 3, "queuecostvalue", FLOAT4OID, -1, 0);
		TupleDescInitEntry(tupledesc, (AttrNumber) 4, "queuewaiters", INT4OID, -1, 0);
		TupleDescInitEntry(tupledesc, (AttrNumber) 5, "queueholders", INT4OID, -1, 0);

		funcctx->tuple_desc = BlessTupleDesc(tupledesc);

		/* Return to original context when allocating transient memory */
		MemoryContextSwitchTo(oldcontext);

		/* Get a snapshot of current state of resource queues */
		BuildQueueStatusContext(fctx);
		
		funcctx->max_calls = fctx->numRecords;
	}

	funcctx = SRF_PERCALL_SETUP();

	/* Get the saved state. */
	fctx = funcctx->user_fctx;

	if (funcctx->call_cntr < funcctx->max_calls)
	{
		int			i = funcctx->call_cntr;
		QueueStatusRec	*record = &fctx->record[i];
		Datum		values[PG_RESQUEUE_STATUS_COLUMNS];
		bool		nulls[PG_RESQUEUE_STATUS_COLUMNS];

		values[0] = ObjectIdGetDatum(record->queueid);
		nulls[0] = false;

		/* Make the counters null if the limit is disbaled. */
		if (record->queuecountthreshold != INVALID_RES_LIMIT_THRESHOLD)
		{
			values[1] = Float4GetDatum(record->queuecountvalue);
			nulls[1] = false;
		}
		else
			nulls[1] = true;

		if (record->queuecostthreshold != INVALID_RES_LIMIT_THRESHOLD)
		{
			values[2] = Float4GetDatum(record->queuecostvalue);
			nulls[2] = false;
		}
		else
			nulls[2] = true;
		

		values[3] = record->queuewaiters;
		nulls[3] = false;

		values[4] = record->queueholders;
		nulls[4] = false;
		
		/* Build and return the tuple. */
		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
		result = HeapTupleGetDatum(tuple);

		SRF_RETURN_NEXT(funcctx, result);
	}
	else
		SRF_RETURN_DONE(funcctx);
}

/**
 * This copies out the current state of resource queues.
 */
static void BuildQueueStatusContext(QueueStatusContext *fctx)
{
	LWLockId partitionLock;
	int num_calls = 0;
	int partition = 0;
	HASH_SEQ_STATUS		status;
	ResQueueData		*queue = NULL;

	Assert(fctx);
	Assert(fctx->record);

	/* 
	 * Take all the partition locks. This is necessary as we want to
	 * to use the same lock order as the rest of the code - i.e. partition
	 * locks *first* *then* the queue lock (otherwise we could deadlock
	 * ourselves). 
	 */
	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
	{
		partitionLock = FirstLockMgrLock + partition;
		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
	}

	/*
	 * Lock resource queue structures.
	 */
	LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);

	/* Initialize for a sequential scan of the resource queue hash. */
	hash_seq_init(&status, ResQueueHash);
	num_calls = hash_get_num_entries(ResQueueHash);
	Assert(num_calls == ResScheduler->num_queues);

	int i = 0;
	while ((queue = (ResQueueData *) hash_seq_search(&status)) != NULL)
	{
		int			j;
		ResLimit	limits = NULL;
		uint32		hashcode;

		/**
		 * Gather thresholds and current values on activestatements, cost and memory
		 */
		limits = queue->limits;
		
		fctx->record[i].queueid = queue->queueid;

		for (j = 0; j < NUM_RES_LIMIT_TYPES; j++)
		{
			switch (limits[j].type)
			{
				case RES_COUNT_LIMIT:
				{
					fctx->record[i].queuecountthreshold =
						limits[j].threshold_value;

					fctx->record[i].queuecountvalue =
						limits[j].current_value;
				}
				break;

				case RES_COST_LIMIT:
				{
					fctx->record[i].queuecostthreshold =
						limits[j].threshold_value;

					fctx->record[i].queuecostvalue =
						limits[j].current_value;
				}
				break;

				case RES_MEMORY_LIMIT:
				{
					fctx->record[i].queuememthreshold =
						limits[j].threshold_value;

					fctx->record[i].queuememvalue =
						limits[j].current_value;					
				}
				break;
				
				default:
					Assert(false && "Should never reach here!");
			}
		}

		/* 
		 * Get the holders and waiters count for the corresponding resource
		 * lock.
		 */
		LOCKTAG		tag;
		LOCK		*lock;

		SET_LOCKTAG_RESOURCE_QUEUE(tag, queue->queueid);
		hashcode = LockTagHashCode(&tag);

		bool		found = false;

		lock = (LOCK *)
			hash_search_with_hash_value(LockMethodLockHash, (void *)&tag, hashcode, HASH_FIND, &found);

		if (!found || !lock)
		{
			fctx->record[i].queuewaiters = 0;
			fctx->record[i].queueholders = 0;
		}
		else
		{
			fctx->record[i].queuewaiters = lock->nRequested - lock->nGranted;
			fctx->record[i].queueholders = lock->nGranted;
		}

		i++;
		Assert(i <= MaxResourceQueues);
	}

	/* Release the resource scheduler lock. */
	LWLockRelease(ResQueueLock);

	/* ...and the partition locks. */
	for (partition = NUM_LOCK_PARTITIONS; --partition >= 0;)
	{
		partitionLock = FirstLockMgrLock + partition;
		LWLockRelease(partitionLock);
	}

	/* Set the real no. of calls as we know it now! */
	fctx->numRecords = i;
	return;
}

/* Number of records produced per queue. */
#define PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE 8

/* Number of columns produced by function */
#define PG_RESQUEUE_STATUS_KV_COLUMNS 3

/* Scratch space used to write out strings */
#define PG_RESQUEUE_STATUS_KV_BUFSIZE 256

/*
 * pg_resqueue_status_extended - outputs the current state of resource queues in the following format:
 * (queueid, key, value) where key and value are text. This makes the function extremely flexible.
 */
Datum
pg_resqueue_status_kv(PG_FUNCTION_ARGS)
{
	FuncCallContext			*funcctx = NULL;
	Datum					result;
	MemoryContext			oldcontext = NULL;
	QueueStatusContext		*fctx = NULL;			/* User function context. */
	HeapTuple				tuple = NULL;

	if (SRF_IS_FIRSTCALL())
	{

		funcctx = SRF_FIRSTCALL_INIT();

		/* Switch context when allocating stuff to be used in later calls */
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		fctx = (QueueStatusContext *) palloc(sizeof(QueueStatusContext));
		/*
		 * Allocate space for the per-call area - this overestimates, but
		 * means we can take the resource rescheduler lock after our
		 * memory context switching.
		 */
		fctx->record = (QueueStatusRec *)palloc(sizeof(QueueStatusRec) * MaxResourceQueues);

		funcctx->user_fctx = fctx;

		/* Construct a tuple descriptor for the result rows. */
		TupleDesc tupledesc = CreateTemplateTupleDesc(PG_RESQUEUE_STATUS_KV_COLUMNS, false);
		TupleDescInitEntry(tupledesc, (AttrNumber) 1, "queueid", OIDOID, -1, 0);
		TupleDescInitEntry(tupledesc, (AttrNumber) 2, "key", TEXTOID, -1, 0);
		TupleDescInitEntry(tupledesc, (AttrNumber) 3, "value", TEXTOID, -1, 0);

		funcctx->tuple_desc = BlessTupleDesc(tupledesc);

		/* Return to original context when allocating transient memory */
		MemoryContextSwitchTo(oldcontext);

		/* Get a snapshot of current state of resource queues */
		BuildQueueStatusContext(fctx);
		
		funcctx->max_calls = fctx->numRecords * PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE; 
	}

	funcctx = SRF_PERCALL_SETUP();

	/* Get the saved state. */
	fctx = funcctx->user_fctx;

	if (funcctx->call_cntr < funcctx->max_calls)
	{
		int			i = funcctx->call_cntr / PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE; /* record number */
		int			j = funcctx->call_cntr % PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE; /* which attribute is being produced */
		QueueStatusRec	*record = &fctx->record[i];
		Datum		values[PG_RESQUEUE_STATUS_KV_COLUMNS];
		bool		nulls[PG_RESQUEUE_STATUS_KV_COLUMNS];
		char		buf[PG_RESQUEUE_STATUS_KV_BUFSIZE]; 
		
		nulls[0] = false;		
		nulls[1] = false;
		nulls[2] = false;

		values[0] = ObjectIdGetDatum(record->queueid);

		switch(j)
		{
			case 0:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqcountlimit"));
				snprintf(buf, ARRAY_SIZE(buf), "%d", (int) ceil(record->queuecountthreshold));
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			case 1:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqcountvalue"));
				snprintf(buf, ARRAY_SIZE(buf), "%d", (int) ceil(record->queuecountvalue));
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			case 2:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqcostlimit"));
				snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuecostthreshold);
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			case 3:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqcostvalue"));
				snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuecostvalue);
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			case 4:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqmemorylimit"));
				snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuememthreshold);
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			case 5:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqmemoryvalue"));
				snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuememvalue);
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			case 6:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqwaiters"));
				snprintf(buf, ARRAY_SIZE(buf), "%d", record->queuewaiters);
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			case 7:
			{
				values[1] = PointerGetDatum(cstring_to_text("rsqholders"));
				snprintf(buf, ARRAY_SIZE(buf), "%d", record->queueholders);
				values[2] = PointerGetDatum(cstring_to_text(buf));
				break;
			}
			default:
				Assert(false && "Cannot reach here");
		}
				
		/* Build and return the tuple. */
		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
		result = HeapTupleGetDatum(tuple);

		SRF_RETURN_NEXT(funcctx, result);
	}
	else
		SRF_RETURN_DONE(funcctx);
}


#ifdef USE_ASSERT_CHECKING
/**
 * Checks that in-memory data-structures are consistent with the catalog table.
 * This operation can only be performed when we have the ResQueueLock.
 */
void AssertMemoryLimitsMatch(void)
{
	Assert(LWLockHeldExclusiveByMe(ResQueueLock));
	
	HASH_SEQ_STATUS status;
	
	/* Initialize for a sequential scan of the resource queue hash. */
	hash_seq_init(&status, ResQueueHash);

	ResQueue queue = NULL;
	while ((queue = (ResQueue) hash_seq_search(&status)) != NULL)
	{
		Oid queueId = queue->queueid;
		Assert(queueId != InvalidOid);
		double v1 = ceil(queue->limits[RES_MEMORY_LIMIT].threshold_value);
		double v2 = ceil((double) ResourceQueueGetMemoryLimitInCatalog(queueId));
				
		if (gp_log_resqueue_memory)
		{
			elog(gp_resqueue_memory_log_level, "Memory limit for queue %d is %.0f in catalog and %.0f in shared mem", queueId, v2, v1);
		}
		
		AssertImply(gp_resqueue_memory_policy != RESQUEUE_MEMORY_POLICY_NONE,
				v1 == v2 && "Resource queue / shared structure do not match on memory limit in policy AUTO.");
	}
	
	return;
}
#endif
